 1.Kafka高级特性解析之分区中修改分区副本
   
   实际项目中，我们可能由于主题的副本因子设置的问题，需要重新设置副本因子
   或者由于集群的扩展，需要重新设置副本因子。
   topic一旦使用又不能轻易删除重建，因此动态增加副本因子就成为最终的选择。
   说明：kafka1.0 版本配置文件默认没有default.replication.factor=x， 因此如果创建topic时，不指定–
replication-factor 想， 默认副本因子为1. 我们可以在⾃⼰的server.properties中配置上常⽤的副本因子，省去手动调
整。例如设置default.replication.factor=3， 详细内容可参考官方文档https://kafka.apache.org/documentation/#r
eplication
   原因分析：
   假设我们有2个kafka broker分别broker0，broker1。
   1).当我们创建的topic有2个分区partition时并且replication-factor为1，基本上一个broker上一个分区。当一
个broker宕机了，该topic就方法使用了，因为两个个分区只有一个能用。
   2).当我们创建的topic有3个分区partition时并且replication-factor为2时，可能分区数据分布情况是
   broker0， partiton0，partiton1，partiton2，
   broker1， partiton1，partiton0，partiton2，
  每个分区有一个副本，当其中一个broker宕机了，kafka集群还能完整凑出该topic的两个分区，例如当
broker0宕机了，可以通过broker1组合出topic的两个分区。
  1).创建主题
  [root@linux121 ~]# kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic
tp_re_02 --partitions 3 --replication-factor 1
  2).查看主题细节
  [root@linux121 ~]# kafka-topics.sh --zookeeper linux121:2181/myKafka --describe --
topic tp_re_02
Topic:tp_re_02 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
[root@linux121 ~]#
  3).修改副本因子：错误
  [root@linux121 ~]# kafka-topics.sh --zookeeper linux121:2181/myKafka --alter --topic 
  tp_re_02 --replication-factor 2
  Option "[replication-factor]" can't be used with option"[alter]"

  4).使用kafka-reassign-partitions.sh 修改副本因子：
  创建increment-replication-factor.json
{
   "version":1,
   "partitions":[
     {"topic":"tp_re_02","partition":0,"replicas":[0,1]},
     {"topic":"tp_re_02","partition":1,"replicas":[0,1]},
     {"topic":"tp_re_02","partition":2,"replicas":[1,0]}
   ]
}
  5).执行分配
  [root@linux121 ~]# kafka-reassign-partitions.sh --zookeeper linux121:2181/myKafka 
--reassignment-json-file increment-replication-factor.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"tp_re_02","partition":2,"replicas":
[0],"log_dirs":["any"]},{"topic":"tp_re_02","partition":1,"replicas":
[1],"log_dirs":["any"]},{"topic":"tp_re_02","partition":0,"replicas":
[0],"log_dirs":["any"]}]}
   Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions                                                                                                                                                                         
  6).查看主题细节
  [root@linux121 ~]# kafka-topics.sh --zookeeper linux121:2181/myKafka --describe --
topic tp_re_02
Topic:tp_re_02 PartitionCount:3 ReplicationFactor:2 Configs:
   Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0
   Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
   Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
[root@linux121 ~]#
  7).搞定!
 
 2.分区分配策略
   
   在Kafka中，每个Topic会包含多个分区，默认情况下一个分区只能被一个消费组下面的一个消费者消费，这里就
产生了分区分配的问题。Kafka中提供了多重分区分配算法（PartitionAssignor）的实现：RangeAssignor、
RoundRobinAssignor、StickyAssignor。
   1).RangeAssignor                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
   PartitionAssignor接口用于用户定义实现分区分配算法，以实现Consumer之间的分区分配。
   消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的一
个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。Kafka默认采用RangeAssignor
的分配算法
   RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic，首先对分区按照分区ID进行数值排序，然
后订阅这个Topic的消费组的消费者再进行字典排序，之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡，因
为分区数可能无法被消费者数量整除，那么有一些消费者就会多分配到一些分区。
   大致算法如下：
assign(topic, consumers) {
   // 对分区和Consumer进行排序
   List<Partition> partitions = topic.getPartitions();
   sort(partitions);
   sort(consumers);
    // 计算每个Consumer分配的分区数
   int numPartitionsPerConsumer = partition.size() / consumers.size();
   // 额外有一些Consumer会多分配到分区
   int consumersWithExtraPartition = partition.size() % consumers.size();
   // 计算分配结果
   for (int i = 0, n = consumers.size(); i < n; i++) {
        // 第i个Consumer分配到的分区的index
       int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
        // 第i个Consumer分配到的分区数
       int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
        // 分装分配结果
        assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start
+ length));
   }
}
   RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度，然后将分区按照跨度进
行平均分配，以保证分区尽可能均匀地分配给所有的消费者。对于每一个Topic，RangeAssignor策略会将消费组内所
有订阅这个Topic的消费者按照名称的字典序排序，然后为每个消费者划分固定的分区范围，如果不够平均分配，那么
字典序靠前的消费者会被多分配一个分区。
   这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加，不均衡的问题会越来越严重，比如上图中
4 个分区3个消费者的场景，C0会多分配一个分区。如果此时再订阅一个分区数为4的Topic，那么C0又会比C1、C2多
分配一个分区，这样C0总共就比C1、C2多分配两个分区了，而且随着Topic的增加，这个情况会越来越严重。
  字典序靠前的消费组中的消费者比较“贪婪”。
  2).RoundRobinAssignor
  RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽量均衡的分配
（RangeAssignor是针对单个Topic的分区进行排序分配的）。如果消费组内，消费者订阅的Topic列表是相同的（每
个消费者都订阅了相同的Topic），那么分配结果是尽量均衡的（消费者之间分配到的分区数的差值不会超过1）。如
果订阅的Topic列表是不同的，那么分配结果是不保证“尽量均衡”的，因为某些消费者不参与一些Topic的分配。
  相对于RangeAssignor，在订阅多个Topic的情况下，RoundRobinAssignor的方式能消费者之间尽量均衡的分配
到分区（分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多，差值越来
越大）。
  对于消费组内消费者订阅Topic不一致的情况：假设有两个个消费者分别为C0和C1，有2个Topic T1、T2，分别拥
有3和2个分区，并且C0订阅T1和T2，C1订阅T2，那么RoundRobinAssignor的分配结果如下：
  看上去分配已经尽量的保证均衡了，不过可以发现C0承担了4个分区的消费⽽C1订阅了T2一个分区，是不是把
T2P0交给C1消费能更加的均衡呢？
  3).StickyAssignor
  动机
  尽管RoundRobinAssignor已经在RangeAssignor上做了一些优化来更均衡的分配分区，但是在一些情况下依旧会
产生严重的分配偏差，比如消费组中订阅的Topic列表不相同的情况下。
  更核心的问题是无论是RangeAssignor，还是RoundRobinAssignor，当前的分区分配算法都没有考虑上一次的分
配结果。显然，在执行一次新的分配之前，如果能考虑到上一次分配的结果，尽量少的调整分区分配的变动，显然是
能节省很多开销的。
  目标
  从字面意义上看，Sticky是“粘性的”，可以理解为分配结果是带“粘性的”：
  (1).分区的分配尽量的均衡
  (2).每一次重分配的结果尽量与上一次分配结果保持一致
  当这两个目标发生冲突时，优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的，而第二个目
标才真正体现出StickyAssignor特性的。
  我们先来看预期分配的结构，后续再具体分析StickyAssignor的算法实现。
  例如：
  有3个Consumer：C0、C1、C2
  有4个Topic：T0、T1、T2、T3，每个Topic有2个分区
  所有Consumer都订阅了这4个分区
  StickyAssignor的分配结果如下图所示（增加RoundRobinAssignor分配作为对比）：
  如果消费者1宕机，则按照RoundRobin的方式分配结果如下：
  打乱从新来过，轮询分配：
  按照Sticky的方式：
  仅对消费者1分配的分区进行重分配，红线部分。最终达到均衡的目的。
  再举一个例子：
  有3个Consumer：C0、C1、C2
  3 个Topic：T0、T1、T2，它们分别有1、2、3个分区
  C0订阅T0；C1订阅T0、T1；C2订阅T0、T1、T2
  分配结果如下图所示：
  消费者0下线，则按照轮询的方式分配：
  按照Sticky方式分配分区，仅仅需要动的就是红线部分，其他部分不动。
  StickyAssignor分配方式的实现稍微复杂点儿，我们可以先理解图示部分即可。感兴趣的同学可以研究一下。
  4).自定义分配策略
  自定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口。
PartitionAssignor接口的定义如下：
Subscription subscription(Set<String> topics);
String name();
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription>
subscriptions);
void onAssignment(Assignment assignment);
class Subscription {
    private final List<String> topics;
    private final ByteBuffer userData;
}
class Assignment {
    private final List<TopicPartition> partitions;
    private final ByteBuffer userData;
}
  PartitionAssignor接口中定义了两个内部类：Subscription和Assignment。
  Subscription类用来表示消费者的订阅信息，类中有两个属性：topics和userData，分别表示消费者所订阅topic
列表和用户自定义信息。PartitionAssignor接⼝通过subscription()方法来设置消费者⾃身相关的Subscription信息，
注意到此方法中只有一个参数topics，与Subscription类中的topics的相互呼应，但是并没有有关userData的参数体
现。为了增强用户对分配结果的控制，可以在subscription()方法内部添加一些影响分配的用户自定义信息赋予
userData，比如：权重、ip地址、host或者机架（rack）等等。
  再来说一下Assignment类，它是用来表示分配结果信息的，类中也有两个属性：partitions和userData，分别表
示所分配到的分区集合和用户自定义的数据。可以通过PartitionAssignor接⼝中的onAssignment()方法是在每个消费
者收到消费组leader分配结果时的回调函数，例如在StickyAssignor策略中就是通过这个方法保存当前的分配⽅案，以
备在下次消费组再平衡（rebalance）时可以提供分配参考依据。
  接口中的name()方法用来提供分配策略的名称，对于Kafka提供的3种分配策略而言，RangeAssignor对应的
protocol_name为“range”，RoundRobinAssignor对应的protocol_name为“roundrobin”，StickyAssignor对应的
protocol_name为“sticky”，所以自定义的分配策略中要注意命名的时候不要与已存在的分配策略发生冲突。这个命名
用来标识分配策略的名称，在后面所描述的加⼊消费组以及选举消费组leader的时候会有涉及。
  真正的分区分配方案的实现是在assign()⽅法中，⽅法中的参数metadata表示集群的元数据信息，而subscriptions表
示消费组内各个消费者成员的订阅信息，最终方法返回各个消费者的分配信息。
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyAssignor extends AbstractPartitionAssignor {

}
在使用时，消费者客户端需要添加相应的Properties参数，示例如下：
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
MyAssignor.class.getName());

  
  
  